package com.example.consumer;

import com.example.util.DateUtil;
import org.apache.pulsar.client.api.*;
import org.junit.Before;
import org.junit.Test;

import java.time.LocalDateTime;

/**
 * 消费定时消息
 *
 * @author linfeng
 * @date 2022/2/28 10:57
 */
public class TimedMessageConsumer {

    public static final String TOPIC_NAME = "timed-test";

    private PulsarClient pulsarClient;

    private Consumer<String> consumer;

    @Before
    public void init() throws PulsarClientException {
        pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        // 定时/延时 消息的消费模式仅支持使用 Shared 模式进行消费，否则会失去定时效果（Key-shared 也不支持）。
        consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(TOPIC_NAME)
                .subscriptionMode(SubscriptionMode.Durable)
                // .subscriptionType(SubscriptionType.Exclusive)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName("subscription1")
                .subscribe();
    }

    @Test
    public void test() throws PulsarClientException {
        while (true){
            Message<String> receive = consumer.receive();
            String message = receive.getValue();
            System.out.println(DateUtil.toStr(LocalDateTime.now()) + "收到消息：" + message);
        }
    }
}
